/*
 * Copyright (C) 2011 The Guava Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 * in compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package com.ksvip.next.components.redis.filter;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import java.io.Serializable;
import java.util.List;

import javax.annotation.Nullable;

import redis.clients.jedis.JedisPool;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Predicate;
import com.google.common.hash.Funnel;

/**
 * A Bloom filter for instances of {@code T}. A Bloom filter offers an
 * approximate containment test with one-sided error: if it claims that an
 * element is contained in it, this might be in error, but if it claims that an
 * element is <i>not</i> contained in it, then this is definitely true.
 * <p>
 * If you are unfamiliar with Bloom filters, this nice <a
 * href="http://llimllib.github.com/bloomfilter-tutorial/">tutorial</a> may help
 * you understand how they work.
 * <p>
 * The false positive probability ({@code FPP}) of a bloom filter is defined as
 * the probability that {@linkplain #mightContain(Object)} will erroneously
 * return {@code true} for an object that has not actually been put in the
 * {@code BloomFilter}.
 * <p>
 * Bloom filters are serializable. They also support a more compact serial
 * representation via the {@link #writeTo} and {@link #readFrom} methods. Both
 * serialized forms will continue to be supported by future versions of this
 * library. However, serial forms generated by newer versions of the code may
 * not be readable by older versions of the code (e.g., a serialized bloom
 * filter generated today may <i>not</i> be readable by a binary that was
 * compiled 6 months ago).
 *
 * @param <T> the type of instances that the {@code BloomFilter} accepts
 * @author Dimitris Andreou
 * @author Kevin Bourrillion
 * @since 11.0
 */
public final class RedisBloomFilter<T> implements Predicate<T>, Serializable {
    /**
     *
     */
    private static final long serialVersionUID = 8872321086070566868L;

    /**
     * A strategy to translate T instances, to {@code numHashFunctions} bit
     * indexes.
     * <p>
     * Implementations should be collections of pure functions (i.e. stateless).
     */
    interface Strategy extends Serializable {
        <T> void pipelinePut(List<T> objectlist, Funnel<? super T> funnel, int numHashFunctions, RedisBitmaps bits);

        /**
         * Sets {@code numHashFunctions} bits of the given bit array, by hashing
         * a user element.
         * <p>
         * Returns whether any bits changed as a result of this operation.
         */
        <T> boolean put(T object, Funnel<? super T> funnel, int numHashFunctions, RedisBitmaps bits);

        /**
         * Queries {@code numHashFunctions} bits of the given bit array, by
         * hashing a user element; returns {@code true} if and only if all
         * selected bits are set.
         */
        <T> boolean mightContain(T object, Funnel<? super T> funnel, int numHashFunctions, RedisBitmaps bits);

        /**
         * Identifier used to encode this strategy, when marshalled as part of a
         * BloomFilter. Only values in the [-128, 127] range are valid for the
         * compact serial form. Non-negative values are reserved for enums
         * defined in BloomFilterStrategies; negative values are reserved for
         * any custom, stateful strategy we may define (e.g. any kind of
         * strategy that would depend on user input).
         */
        int ordinal();
    }

    /** The bit set of the BloomFilter (not necessarily power of 2!) */
    private RedisBitmaps      bitmap;

    /** Number of hashes per element */
    private int               numHashFunctions;

    /** The funnel to translate Ts to bytes */
    private Funnel<? super T> funnel;

    /**
     * The strategy we employ to map an element T to {@code numHashFunctions}
     * bit indexes.
     */
    private Strategy          strategy;

    /**
     * Creates a BloomFilter.
     */
    private RedisBloomFilter(RedisBitmaps bitmap, long numBits, int numHashFunctions, Funnel<? super T> funnel,
                             Strategy strategy) {
        checkArgument(numHashFunctions > 0, "numHashFunctions (%s) must be > 0", numHashFunctions);
        checkArgument(numHashFunctions <= 255, "numHashFunctions (%s) must be <= 255", numHashFunctions);
        this.bitmap = checkNotNull(bitmap);
        this.numHashFunctions = numHashFunctions;
        this.funnel = checkNotNull(funnel);
        this.strategy = checkNotNull(strategy);
    }

    /**
     * Returns {@code true} if the element <i>might</i> have been put in this
     * Bloom filter, {@code false} if this is <i>definitely</i> not the case.
     */
    public boolean mightContain(T object) {
        return strategy.mightContain(object, funnel, numHashFunctions, bitmap);
    }

    /**
     * @deprecated Provided only to satisfy the {@link Predicate} interface; use
     *             {@link #mightContain} instead.
     */
    @Deprecated
    @Override
    public boolean apply(T input) {
        return mightContain(input);
    }

    /**
     * Puts an element into this {@code BloomFilter}. Ensures that subsequent
     * invocations of {@link #mightContain(Object)} with the same element will
     * always return {@code true}.
     *
     * @return true if the bloom filter's bits changed as a result of this
     *         operation. If the bits changed, this is <i>definitely</i> the
     *         first time {@code object} has been added to the filter. If the
     *         bits haven't changed, this <i>might</i> be the first time
     *         {@code object} has been added to the filter. Note that
     *         {@code put(t)} always returns the <i>opposite</i> result to what
     *         {@code mightContain(t)} would have returned at the time it is
     *         called."
     * @since 12.0 (present in 11.0 with {@code void} return type})
     */
    public boolean put(T object) {
        return strategy.put(object, funnel, numHashFunctions, bitmap);
    }

    public void pipelinePut(List<T> listobject) {
        strategy.pipelinePut(listobject, funnel, numHashFunctions, bitmap);
    }

    /**
     * Returns the probability that {@linkplain #mightContain(Object)} will
     * erroneously return {@code true} for an object that has not actually been
     * put in the {@code BloomFilter}.
     * <p>
     * Ideally, this number should be close to the {@code fpp} parameter passed
     * in {@linkplain #create(Funnel, int, double)}, or smaller. If it is
     * significantly higher, it is usually the case that too many elements (more
     * than expected) have been put in the {@code BloomFilter}, degenerating it.
     *
     * @since 14.0 (since 11.0 as expectedFalsePositiveProbability())
     */
    public double expectedFpp() {
        // You down with FPP? (Yeah you know me!) Who's down with FPP? (Every last homie!)
        return Math.pow((double) bitmap.bitCount() / bitSize(), numHashFunctions);
    }

    /**
     * Returns the number of bits in the underlying bit array.
     */
    @VisibleForTesting
    long bitSize() {
        return bitmap.bitSize();
    }

    /**
     * Determines whether a given bloom filter is compatible with this bloom
     * filter. For two bloom filters to be compatible, they must:
     * <ul>
     * <li>not be the same instance
     * <li>have the same number of hash functions
     * <li>have the same bit size
     * <li>have the same strategy
     * <li>have equal funnels
     * <ul>
     *
     * @param that The bloom filter to check for compatibility.
     * @since 15.0
     */
    public boolean isCompatible(RedisBloomFilter<T> that) {
        checkNotNull(that);
        return (this != that) && (this.numHashFunctions == that.numHashFunctions) && (this.bitSize() == that.bitSize())
                && (this.strategy.equals(that.strategy)) && (this.funnel.equals(that.funnel));
    }

    @Override
    public boolean equals(@Nullable Object object) {
        if (object == this) {
            return true;
        }
        if (object instanceof RedisBloomFilter) {
            RedisBloomFilter<?> that = (RedisBloomFilter<?>) object;
            return this.numHashFunctions == that.numHashFunctions && this.funnel.equals(that.funnel)
                    && this.bitmap.equals(that.bitmap) && this.strategy.equals(that.strategy);
        }
        return false;
    }

    @Override
    public int hashCode() {
        return Objects.hashCode(numHashFunctions, funnel, strategy, bitmap);
    }

    /**
     * Creates a {@link RedisBloomFilter BloomFilter<T>} with the expected
     * number of insertions and expected false positive probability.
     * <p>
     * Note that overflowing a {@code BloomFilter} with significantly more
     * elements than specified, will result in its saturation, and a sharp
     * deterioration of its false positive probability.
     * <p>
     * The constructed {@code BloomFilter<T>} will be serializable if the
     * provided {@code Funnel<T>} is.
     * <p>
     * It is recommended that the funnel be implemented as a Java enum. This has
     * the benefit of ensuring proper serialization and deserialization, which
     * is important since {@link #equals} also relies on object identity of
     * funnels.
     *
     * @param funnel the funnel of T's that the constructed
     *            {@code BloomFilter<T>} will use
     * @param expectedInsertions the number of expected insertions to the
     *            constructed {@code BloomFilter<T>}; must be positive
     * @param fpp the desired false positive probability (must be positive and
     *            less than 1.0)
     * @return a {@code BloomFilter}
     */
    public static <T> RedisBloomFilter<T> create(String key, Funnel<? super T> funnel, int expectedInsertions,
                                                 double fpp) {
        return create(key, funnel, (long) expectedInsertions, fpp, null);
    }

    /**
     * Creates a {@link RedisBloomFilter BloomFilter<T>} with the expected
     * number of insertions and expected false positive probability.
     * <p>
     * Note that overflowing a {@code BloomFilter} with significantly more
     * elements than specified, will result in its saturation, and a sharp
     * deterioration of its false positive probability.
     * <p>
     * The constructed {@code BloomFilter<T>} will be serializable if the
     * provided {@code Funnel<T>} is.
     * <p>
     * It is recommended that the funnel be implemented as a Java enum. This has
     * the benefit of ensuring proper serialization and deserialization, which
     * is important since {@link #equals} also relies on object identity of
     * funnels.
     *
     * @param funnel the funnel of T's that the constructed
     *            {@code BloomFilter<T>} will use
     * @param expectedInsertions the number of expected insertions to the
     *            constructed {@code BloomFilter<T>}; must be positive
     * @param fpp the desired false positive probability (must be positive and
     *            less than 1.0)
     * @return a {@code BloomFilter}
     * @since 19.0
     */
    public static <T> RedisBloomFilter<T> create(String key, Funnel<? super T> funnel, long expectedInsertions,
                                                 double fpp, JedisPool jedisPool) {
        return create(key, funnel, expectedInsertions, fpp, BloomFilterStrategies.MURMUR128_MITZ_64, jedisPool);
    }

    @VisibleForTesting
    public static <T> RedisBloomFilter<T> create(String key, Funnel<? super T> funnel, long expectedInsertions,
                                                 double fpp, Strategy strategy, JedisPool jedisPool) {
        checkNotNull(funnel);
        checkArgument(expectedInsertions >= 0, "Expected insertions (%s) must be >= 0", expectedInsertions);
        checkArgument(fpp > 0.0, "False positive probability (%s) must be > 0.0", fpp);
        checkArgument(fpp < 1.0, "False positive probability (%s) must be < 1.0", fpp);
        checkNotNull(strategy);

        if (expectedInsertions == 0) {
            expectedInsertions = 1;
        }

        /*
         * TODO(user): Put a warning in the javadoc about tiny fpp values, since
         * the resulting size is proportional to -log(p), but there is not much
         * of a point after all, e.g. optimalM(1000, 0.0000000000000001) = 76680
         * which is less than 10kb. Who cares!
         */
        long numBits = optimalNumOfBits(expectedInsertions, fpp);
        int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits);
        RedisBitmaps bitmap = new RedisBitmaps(jedisPool, key, numBits);
        try {
            return new RedisBloomFilter<T>(bitmap, numBits, numHashFunctions, funnel, strategy);
        } catch (IllegalArgumentException e) {
            throw new IllegalArgumentException("Could not create BloomFilter of " + numBits + " bits", e);
        }
    }

    /**
     * Creates a {@link RedisBloomFilter BloomFilter<T>} with the expected
     * number of insertions and a default expected false positive probability of
     * 3%.
     * <p>
     * Note that overflowing a {@code BloomFilter} with significantly more
     * elements than specified, will result in its saturation, and a sharp
     * deterioration of its false positive probability.
     * <p>
     * The constructed {@code BloomFilter<T>} will be serializable if the
     * provided {@code Funnel<T>} is.
     * <p>
     * It is recommended that the funnel be implemented as a Java enum. This has
     * the benefit of ensuring proper serialization and deserialization, which
     * is important since {@link #equals} also relies on object identity of
     * funnels.
     *
     * @param funnel the funnel of T's that the constructed
     *            {@code BloomFilter<T>} will use
     * @param expectedInsertions the number of expected insertions to the
     *            constructed {@code BloomFilter<T>}; must be positive
     * @return a {@code BloomFilter}
     */
    public static <T> RedisBloomFilter<T> create(String key, Funnel<? super T> funnel, int expectedInsertions) {
        return create(key, funnel, (long) expectedInsertions);
    }

    /**
     * Creates a {@link RedisBloomFilter BloomFilter<T>} with the expected
     * number of insertions and a default expected false positive probability of
     * 3%.
     * <p>
     * Note that overflowing a {@code BloomFilter} with significantly more
     * elements than specified, will result in its saturation, and a sharp
     * deterioration of its false positive probability.
     * <p>
     * The constructed {@code BloomFilter<T>} will be serializable if the
     * provided {@code Funnel<T>} is.
     * <p>
     * It is recommended that the funnel be implemented as a Java enum. This has
     * the benefit of ensuring proper serialization and deserialization, which
     * is important since {@link #equals} also relies on object identity of
     * funnels.
     *
     * @param funnel the funnel of T's that the constructed
     *            {@code BloomFilter<T>} will use
     * @param expectedInsertions the number of expected insertions to the
     *            constructed {@code BloomFilter<T>}; must be positive
     * @return a {@code BloomFilter}
     * @since 19.0
     */
    public static <T> RedisBloomFilter<T> create(String key, Funnel<? super T> funnel, long expectedInsertions) {
        return create(key, funnel, expectedInsertions, 0.03, null); // FYI, for 3%, we always get 5 hash functions
    }

    // Cheat sheet:
    //
    // m: total bits
    // n: expected insertions
    // b: m/n, bits per insertion
    // p: expected false positive probability
    //
    // 1) Optimal k = b * ln2
    // 2) p = (1 - e ^ (-kn/m))^k
    // 3) For optimal k: p = 2 ^ (-k) ~= 0.6185^b
    // 4) For optimal k: m = -nlnp / ((ln2) ^ 2)

    /**
     * Computes the optimal k (number of hashes per element inserted in Bloom
     * filter), given the expected insertions and total number of bits in the
     * Bloom filter. See
     * http://en.wikipedia.org/wiki/File:Bloom_filter_fp_probability.svg for the
     * formula.
     *
     * @param n expected insertions (must be positive)
     * @param m total number of bits in Bloom filter (must be positive)
     */
    @VisibleForTesting
    static int optimalNumOfHashFunctions(long n, long m) {
        // (m / n) * log(2), but avoid truncation due to division!
        return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
    }

    /**
     * Computes m (total bits of Bloom filter) which is expected to achieve, for
     * the specified expected insertions, the required false positive
     * probability. See http://en.wikipedia.org/wiki/Bloom_filter#
     * Probability_of_false_positives for the formula.
     *
     * @param n expected insertions (must be positive)
     * @param p false positive rate (must be 0 < p < 1)
     */
    @VisibleForTesting
    static long optimalNumOfBits(long n, double p) {
        if (p == 0) {
            p = Double.MIN_VALUE;
        }
        return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
    }

}
